-Tom M. Mitchell
MLlib is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. It consists of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering (this example!), dimensionality reduction, as well as lower-level optimization primitives and higher-level pipeline APIs.
It divides into two packages:
Using spark.ml is recommended because with DataFrames the API is more versatile and flexible. But we will keep supporting spark.mllib along with the development of spark.ml. Users should be comfortable using spark.mllib features and expect more features coming.
http://spark.apache.org/docs/latest/mllib-guide.html
Learn how to create a recommendation engine using the Alternating Least Squares algorithm in Spark's machine learning library
This is a transnational data set which contains all the transactions occurring between 01/12/2010 and 09/12/2011 for a UK-based and registered non-store online retail. The company mainly sells unique all-occasion gifts. Many customers of the company are wholesalers.
http://archive.ics.uci.edu/ml/datasets/Online+Retail
In [1]:
!rm 'OnlineRetail.csv.gz' -f
!wget https://raw.githubusercontent.com/rosswlewis/RecommendationPoT/master/OnlineRetail.csv.gz
In [ ]:
In [ ]:
Type:
import re
filteredRetailData = splitColumns.filter(lambda l: int(l[3]) > 0 and len(re.sub("\D", "", l[1])) != 0 and len(l[6]) != 0)
In [ ]:
Type:
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
retailRows = filteredRetailData.map(lambda l: Row(inv=int(l[0]), stockCode=int(re.sub("\D", "", l[1])), description=l[2], quant=int(l[3]), invDate=l[4], price=float(l[5]), custId=int(l[6]), country=l[7]))
retailDf = sqlContext.createDataFrame(retailRows)
retailDf.registerTempTable("retailPurchases")
</font>
In [ ]:
In [ ]:
In [ ]:
Latent Factors / rank
The number of columns in the user-feature and product-feature matricies)
Iterations / maxIter
The number of factorization runs
Type:
from pyspark.ml.recommendation import ALS
als1 = ALS(rank=15, maxIter=5, userCol="custId", itemCol="stockCode", ratingCol="purch")
model1 = als1.fit(trainDf)
als2 = ALS(rank=2, maxIter=10, userCol="custId", itemCol="stockCode", ratingCol="purch")
model2 = als2.fit(trainDf)
</font>
In [ ]:
Use the models to predict what the user will rate a certain item. The closer our model to 1 that our model rates an item a user has already purchased, the better.
Some of the users or purchases in the cross validation data may not have been in the training data. Let's remove the ones that aren't.
Type:
customers = set(trainDf.rdd.map(lambda line: line.custId).collect())
stock = set(trainDf.rdd.map(lambda line: line.stockCode).collect())
filteredCvDf = cvDf.rdd.filter(lambda line: line.stockCode in stock and line.custId in customers).toDF()
print cvDf.count()
print filteredCvDf.count()
</font>
In [ ]:
Type:
predictions1 = model1.transform(filteredCvDf)
predictions2 = model2.transform(filteredCvDf)
In [ ]:
Type:
meanSquaredError1 = predictions1.map(lambda line: (line.purch - line.prediction)**2).mean()
meanSquaredError2 = predictions2.map(lambda line: (line.purch - line.prediction)**2).mean()
print 'Mean squared error = %.4f for our first model' % meanSquaredError1
print 'Mean squared error = %.4f for our second model' % meanSquaredError2
</font>
In [ ]:
Type:
filteredTestDf = testDf.rdd.filter(lambda line: line.stockCode in stock and line.custId in customers).toDF()
predictions3 = model2.transform(filteredTestDf)
meanSquaredError3 = predictions3.map(lambda line: (line.purch - line.prediction)**2).mean()
print 'Mean squared error = %.4f for our best model' % meanSquaredError3 </font>
In [ ]:
Use the best model to predict items the user will be interested in.
Type: userItems = trainDf.filter(trainDf.custId == 15544).select("custId").distinct().join( trainDf.select("stockCode").distinct())
In [ ]:
In [ ]:
In [ ]:
Let's look up this user and the recommended product ID's in the excel file...
In [ ]:
query = """
SELECT
distinct description
FROM
retailPurchases
WHERE
stockCode in ()"
"""
items = sqlContext.sql(query)
print items.toPandas()
Daqing Chen, Sai Liang Sain, and Kun Guo, Data mining for the online retail industry: A case study of RFM model-based customer segmentation using data mining, Journal of Database Marketing and Customer Strategy Management, Vol. 19, No. 3, pp. 197–208, 2012 (Published online before print: 27 August 2012. doi: 10.1057/dbm.2012.17).